package cn.com.bonc.app;

import cn.com.bonc.conf.ConfigurationManager;
import cn.com.bonc.constant.Constants;
import cn.com.bonc.domain.UserInfo;
import com.alibaba.fastjson.JSONObject;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.*;
import org.apache.spark.sql.streaming.StreamingQuery;
import org.apache.spark.sql.streaming.StreamingQueryException;
import org.apache.spark.sql.types.MetadataBuilder;
import org.apache.spark.sql.types.StructField;
import org.apache.spark.sql.types.StructType;

import java.util.Arrays;
import java.util.List;

import static org.apache.spark.sql.functions.col;
import static org.apache.spark.sql.functions.from_json;
import static org.apache.spark.sql.types.DataTypes.StringType;

/**
 * Created by ${RQL} on 2019/1/2
 */
public class StructuredStreamingGroupBy {

    /*
     * 读取kafka里面的内容，并进行查询手机号字段，前三位相同的个数
     * 打印在控制台
     * */

        public static void main(String[] args) {


            SparkSession spark = SparkSession.builder()
                    .appName("StructuredStreamingFromKafka")
                    .master("local[4]")
                    .getOrCreate();
            //从kafka里面读取
            Dataset<Row> line = spark
                    .readStream()
                    .format("kafka")
                    .option("kafka.bootstrap.servers",ConfigurationManager.getProperty(Constants.KAFKA_BOOTSTRAP_SERVERS))
                    .option("subscribe", ConfigurationManager.getProperty(Constants.KAFKA_TOPICS))
                    .option("startingOffsets", "earliest")
                    .load();
            MetadataBuilder b = new MetadataBuilder();

            Dataset<String> dataset = line.selectExpr("CAST(value AS STRING)").as(Encoders.STRING());

            Dataset<String> jsonDataset = dataset.map(
                    (MapFunction<String,String>) x->{
                        List<String> list = Arrays.asList(x.split("[|]"));
                        UserInfo userInfo =new UserInfo();
                        userInfo.setUserName(list.get(0));
                        userInfo.setCertID(list.get(1));
                        userInfo.setAddress(list.get(2));
                        userInfo.setPhoneID(list.get(3));
                        userInfo.setMail(list.get(4));
                        userInfo.setUnionID(list.get(5));
                        String jsonStr = JSONObject.toJSONString(userInfo);
                        return jsonStr;
                    }
                    ,Encoders.STRING());

            StructField[] fields = {
                    new StructField("userName",StringType, true,b.build()),
                    new StructField("certID",StringType, true,b.build()),
                    new StructField("address",StringType, true,b.build()),
                    new StructField("phoneID",StringType, true,b.build()),
                    new StructField("mail",StringType, true,b.build()),
                    new StructField("unionID",StringType, true,b.build())
            };

            StructType type = new StructType(fields);

            Dataset<Row> dataFame = jsonDataset
                    .select(from_json(col("value"),type).as("use"))
                    .selectExpr("use.userName",
                            "use.certID",
                            "use.phoneID",
                            "use.unionID",
                            "use.mail",
                            "use.address"
                    );

            //创建临时表info；
            try {
                dataFame.createTempView("info");
            } catch (AnalysisException e) {
                e.printStackTrace();
            }

            Dataset<Row> groupByCount = spark.sql(" select phoneID,count(phoneID) sum from (select substr(phoneID,0,3) phoneID from info)tmp group by phoneID");
//          groupByCount.show();




            //输出shema信息
            System.out.println("-------------输出shema信息");
            dataFame.printSchema();


            StreamingQuery query = groupByCount.writeStream()
                    //.foreach(new ForeachWriterHBase())
                    // .outputMode("append")
                    .outputMode("complete")
                    .format("console")
                    .start();


            try {
                query.awaitTermination();
            } catch (StreamingQueryException e) {
                e.printStackTrace();
            }
        }


    }


